package mqtt

import (
	"gitee.com/squbirreland/imgo/mqtt/agreement"
	"gitee.com/squbirreland/imgo/mqtt/agreement/variableHeader"
	"net"
	"sync"
)

var TopicMap = make(map[string]*Topic, 128)
var TopicMapLock sync.RWMutex

// Topic 主题
type Topic struct {
	//主题名
	TopicName string
	//主题对应的信道
	MessageChan chan *[]byte
	//主题对应的订阅者
	Subscriber []*net.Conn
	//主题订阅者的携程锁
	SubscriberLock *sync.RWMutex
}

func NewTopic(management *mqttManagement, topicName string) *Topic {
	var l sync.RWMutex
	subscriber := make([]*net.Conn, 0, 128)
	msgChan := make(chan *[]byte, 1024)
	t := &Topic{TopicName: topicName, MessageChan: msgChan, Subscriber: subscriber, SubscriberLock: &l}
	go t.listenMsg(management)
	return t
}

//主题对应分发
func (t *Topic) listenMsg(management *mqttManagement) {
	for {
		msg := <-t.MessageChan
		t.SubscriberLock.RLock()
		for i := range t.Subscriber {
			management.Parse(t.Subscriber[i], msg, 1)
		}
		t.SubscriberLock.RUnlock()
	}
}

func Publish(management *mqttManagement, packet *agreement.Packet) {
	publish := packet.VariableHeader.(*variableHeader.Publish)
	TopicMapLock.RLock()
	topic := TopicMap[publish.TopicName]
	TopicMapLock.RUnlock()
	if topic == nil {
		topic = NewTopic(management, publish.TopicName)
		TopicMapLock.Lock()
		TopicMap[publish.TopicName] = topic
		TopicMapLock.Unlock()
	}
	b := packet.Payload.GetPayloadBytes()
	topic.MessageChan <- &b
}
